package main

import (
	"crypto/tls"
	"crypto/x509"
	"fmt"
	"github.com/IBM/sarama"
	"io/ioutil"
)

func main() {
	// 设置Kafka broker地址
	brokerList := []string{"localhost:9092"}

	// TLS配置
	tlsConfig, err := newTLSConfig("kafka_server.crt", "kafka_server.key", "ca.crt")
	if err != nil {
		panic(err)
	}

	// 配置生产者
	config := sarama.NewConfig()
	config.Net.TLS.Config = tlsConfig
	config.Producer.Return.Successes = true

	// 创建生产者
	producer, err := sarama.NewSyncProducer(brokerList, config)
	if err != nil {
		panic(err)
	}
	defer producer.Close()

	// 发送消息
	topic := "test-topic"
	message := "Hello, Kafka with TLS!"

	// 构建消息
	msg := &sarama.ProducerMessage{
		Topic: topic,
		Value: sarama.StringEncoder(message),
	}

	// 发送消息
	partition, offset, err := producer.SendMessage(msg)
	if err != nil {
		fmt.Println("Failed to send message:", err)
	} else {
		fmt.Printf("Message sent to partition %d at offset %d\n", partition, offset)
	}
}

func newTLSConfig(certFile, keyFile, caFile string) (*tls.Config, error) {
	// 加载服务端证书和私钥
	cert, err := tls.LoadX509KeyPair(certFile, keyFile)
	if err != nil {
		return nil, err
	}

	// 加载CA证书（可选）
	caCert, err := ioutil.ReadFile(caFile)
	if err != nil {
		return nil, err
	}

	// 创建证书池，并添加CA证书
	caCertPool := x509.NewCertPool()
	caCertPool.AppendCertsFromPEM(caCert)

	// 配置TLS连接
	tlsConfig := &tls.Config{
		Certificates: []tls.Certificate{cert},
		RootCAs:      caCertPool,
	}

	return tlsConfig, nil
}
